{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "b490915d",
   "metadata": {},
   "source": [
    "# Lifespan Events"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "305342b0",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "import platform\n",
    "from pathlib import Path\n",
    "from tempfile import TemporaryDirectory\n",
    "from typing import Tuple\n",
    "\n",
    "from IPython.display import Markdown as md\n",
    "\n",
    "from fastkafka._components.helpers import _import_from_string, change_dir\n",
    "from fastkafka.testing import ApacheKafkaBroker, Tester, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ae85b1d1",
   "metadata": {},
   "source": [
    "Did you know that you can define some special code that runs before and after your Kafka application? This code will be executed just once, but it covers the whole lifespan of your app! :rocket:\n",
    "\n",
    "Lets break it down:\n",
    "\n",
    "You can define logic (code) that should be executed before the application starts up. This is like a warm-up for your app, getting it ready to consume and produce messages.\n",
    "\n",
    "Similarly, you can define logic (code) that should be executed when the application is shutting down. This is like a cool-down for your app, making sure everything is properly closed and cleaned up.\n",
    "\n",
    "By executing code before consuming and after producing, you cover the entire lifecycle of your application :tada:\n",
    "\n",
    "This is super handy for setting up shared resources that are needed across consumers and producers, like a database connection pool or a machine learning model. And the best part? You can clean up these resources when the app is shutting down!\n",
    "\n",
    "So lets give it a try and see how it can make your Kafka app even more awesome! :muscle:"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c7acf3cf",
   "metadata": {},
   "source": [
    "## Lifespan example - Iris prediction model"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "47606657",
   "metadata": {},
   "source": [
    "Let's dive into an example to see how you can leverage the lifecycle handler to solve a common use case. Imagine that you have some machine learning models that need to consume incoming messages and produce response/prediction messages. These models are shared among consumers and producers, which means you don't want to load them for every message.\n",
    "\n",
    "Here's where the lifecycle handler comes to the rescue! By loading the model before the messages are consumed and produced, but only right before the application starts receiving messages, you can ensure that the model is ready to use without compromising the performance of your tests. In the upcoming sections, we'll walk you through how to initialize an Iris species prediction model and use it in your developed application."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "29f7f3b7",
   "metadata": {},
   "source": [
    "### Lifespan"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fca909ff",
   "metadata": {},
   "source": [
    "You can define this startup and shutdown logic using the lifespan parameter of the FastKafka app, and an async context manager.\n",
    "\n",
    "Let's start with an example and then see it in detail.\n",
    "\n",
    "We create an async function lifespan() with yield like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e0d6a994",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from sklearn.datasets import load_iris\n",
       "from sklearn.linear_model import LogisticRegression\n",
       "from contextlib import asynccontextmanager\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "\n",
       "ml_models = {}\n",
       "\n",
       "@asynccontextmanager\n",
       "async def lifespan(app: FastKafka):\n",
       "    # Load the ML model\n",
       "    print(\"Loading the model!\")\n",
       "    X, y = load_iris(return_X_y=True)\n",
       "    ml_models[\"iris_predictor\"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)\n",
       "    yield\n",
       "    # Clean up the ML models and release the resources\n",
       "    \n",
       "    print(\"Exiting, clearing model dict!\")\n",
       "    ml_models.clear()\n",
       "    \n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "import_lifespan = \"\"\"from sklearn.datasets import load_iris\n",
    "from sklearn.linear_model import LogisticRegression\n",
    "from contextlib import asynccontextmanager\n",
    "\n",
    "\"\"\"\n",
    "\n",
    "import_fastkafka = \"\"\"from fastkafka import FastKafka\n",
    "\n",
    "\"\"\"\n",
    "\n",
    "lifespan = \"\"\"ml_models = {}\n",
    "\n",
    "@asynccontextmanager\n",
    "async def lifespan(app: FastKafka):\n",
    "    # Load the ML model\n",
    "    print(\"Loading the model!\")\n",
    "    X, y = load_iris(return_X_y=True)\n",
    "    ml_models[\"iris_predictor\"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)\n",
    "    yield\n",
    "    # Clean up the ML models and release the resources\n",
    "    \n",
    "    print(\"Exiting, clearing model dict!\")\n",
    "    ml_models.clear()\n",
    "    \n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{import_lifespan + import_fastkafka + lifespan}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f5ebc906",
   "metadata": {},
   "source": [
    "The first thing to notice, is that we are defining an async function with `yield`. This is very similar to Dependencies with `yield`.\n",
    "\n",
    "The first part of the function, before the `yield`, will be executed **before** the application starts.\n",
    "And the part after the `yield` will be executed **after** the application has finished.\n",
    "\n",
    "This lifespan will create an iris_prediction model on application startup and cleanup the references after the app is shutdown.\n",
    "\n",
    "The lifespan will be passed an KafkaApp reference on startup of your application, which you can use to reference your application on startup.\n",
    "\n",
    "For demonstration sake, we also added prints so that when running the app we can see that our lifespan was called."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7b74c00f",
   "metadata": {},
   "source": [
    "### Async context manager\n",
    "\n",
    "Context managers can be used in `with` blocks, our lifespan, for example could be used like this:\n",
    "\n",
    "```python\n",
    "ml_models = {}\n",
    "async with lifespan(None):\n",
    "    print(ml_models)\n",
    "```\n",
    "\n",
    "When you create a context manager or an async context manager, what it does is that, before entering the `with` block, it will execute the code before the `yield`, and after exiting the `with` block, it will execute the code after the `yield`.\n",
    "\n",
    "If you want to learn more about context managers and contextlib decorators, please visit [Python official docs](https://docs.python.org/3/library/contextlib.html)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "250d3323",
   "metadata": {},
   "source": [
    "## App demo"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a449101a",
   "metadata": {},
   "source": [
    "### FastKafka app"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6d6f4163",
   "metadata": {},
   "source": [
    "Lets now create our application using the created lifespan handler.\n",
    "\n",
    "Notice how we passed our lifespan handler to the app when constructing it trough the `lifespan` argument."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fd0ed2c8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from fastkafka import FastKafka\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"localhost\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local development kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    },\n",
       "}\n",
       "\n",
       "kafka_app = FastKafka(\n",
       "    title=\"Iris predictions\",\n",
       "    kafka_brokers=kafka_brokers,\n",
       "    lifespan=lifespan,\n",
       ")\n",
       "\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "app = \"\"\"kafka_brokers = {\n",
    "    \"localhost\": {\n",
    "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
    "        \"description\": \"local development kafka broker\",\n",
    "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
    "    },\n",
    "}\n",
    "\n",
    "kafka_app = FastKafka(\n",
    "    title=\"Iris predictions\",\n",
    "    kafka_brokers=kafka_brokers,\n",
    "    lifespan=lifespan,\n",
    ")\n",
    "\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{import_fastkafka + app}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "972ebb39",
   "metadata": {},
   "source": [
    "### Data modeling\n",
    "\n",
    "Lets model the Iris data for our app:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1ea24631",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from pydantic import BaseModel, Field, NonNegativeFloat\n",
       "\n",
       "class IrisInputData(BaseModel):\n",
       "    sepal_length: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Sepal length in cm\"\n",
       "    )\n",
       "    sepal_width: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Sepal width in cm\"\n",
       "    )\n",
       "    petal_length: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Petal length in cm\"\n",
       "    )\n",
       "    petal_width: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Petal width in cm\"\n",
       "    )\n",
       "\n",
       "\n",
       "class IrisPrediction(BaseModel):\n",
       "    species: str = Field(..., example=\"setosa\", description=\"Predicted species\")\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "import_pydantic = \"\"\"from pydantic import BaseModel, Field, NonNegativeFloat\n",
    "\n",
    "\"\"\"\n",
    "\n",
    "data_model = \"\"\"class IrisInputData(BaseModel):\n",
    "    sepal_length: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Sepal length in cm\"\n",
    "    )\n",
    "    sepal_width: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Sepal width in cm\"\n",
    "    )\n",
    "    petal_length: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Petal length in cm\"\n",
    "    )\n",
    "    petal_width: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Petal width in cm\"\n",
    "    )\n",
    "\n",
    "\n",
    "class IrisPrediction(BaseModel):\n",
    "    species: str = Field(..., example=\"setosa\", description=\"Predicted species\")\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{import_pydantic + data_model}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e6ab2c5c",
   "metadata": {},
   "source": [
    "### Consumers and producers\n",
    "\n",
    "Lets create a consumer and producer for our app that will generate predictions from input iris data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "74aee2c5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "@kafka_app.consumes(topic=\"input_data\", auto_offset_reset=\"latest\")\n",
       "async def on_input_data(msg: IrisInputData):\n",
       "    species_class = ml_models[\"iris_predictor\"].predict(\n",
       "        [[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]\n",
       "    )[0]\n",
       "\n",
       "    await to_predictions(species_class)\n",
       "\n",
       "\n",
       "@kafka_app.produces(topic=\"predictions\")\n",
       "async def to_predictions(species_class: int) -> IrisPrediction:\n",
       "    iris_species = [\"setosa\", \"versicolor\", \"virginica\"]\n",
       "\n",
       "    prediction = IrisPrediction(species=iris_species[species_class])\n",
       "    return prediction\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "producers_and_consumers = \"\"\"@kafka_app.consumes(topic=\"input_data\", auto_offset_reset=\"latest\")\n",
    "async def on_input_data(msg: IrisInputData):\n",
    "    species_class = ml_models[\"iris_predictor\"].predict(\n",
    "        [[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]\n",
    "    )[0]\n",
    "\n",
    "    await to_predictions(species_class)\n",
    "\n",
    "\n",
    "@kafka_app.produces(topic=\"predictions\")\n",
    "async def to_predictions(species_class: int) -> IrisPrediction:\n",
    "    iris_species = [\"setosa\", \"versicolor\", \"virginica\"]\n",
    "\n",
    "    prediction = IrisPrediction(species=iris_species[species_class])\n",
    "    return prediction\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{producers_and_consumers}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e47123e0",
   "metadata": {},
   "source": [
    "### Final app\n",
    "\n",
    "The final app looks like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5969a922",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from sklearn.datasets import load_iris\n",
       "from sklearn.linear_model import LogisticRegression\n",
       "from contextlib import asynccontextmanager\n",
       "\n",
       "from pydantic import BaseModel, Field, NonNegativeFloat\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "\n",
       "class IrisInputData(BaseModel):\n",
       "    sepal_length: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Sepal length in cm\"\n",
       "    )\n",
       "    sepal_width: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Sepal width in cm\"\n",
       "    )\n",
       "    petal_length: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Petal length in cm\"\n",
       "    )\n",
       "    petal_width: NonNegativeFloat = Field(\n",
       "        ..., example=0.5, description=\"Petal width in cm\"\n",
       "    )\n",
       "\n",
       "\n",
       "class IrisPrediction(BaseModel):\n",
       "    species: str = Field(..., example=\"setosa\", description=\"Predicted species\")\n",
       "ml_models = {}\n",
       "\n",
       "@asynccontextmanager\n",
       "async def lifespan(app: FastKafka):\n",
       "    # Load the ML model\n",
       "    print(\"Loading the model!\")\n",
       "    X, y = load_iris(return_X_y=True)\n",
       "    ml_models[\"iris_predictor\"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)\n",
       "    yield\n",
       "    # Clean up the ML models and release the resources\n",
       "    \n",
       "    print(\"Exiting, clearing model dict!\")\n",
       "    ml_models.clear()\n",
       "    \n",
       "kafka_brokers = {\n",
       "    \"localhost\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local development kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    },\n",
       "}\n",
       "\n",
       "kafka_app = FastKafka(\n",
       "    title=\"Iris predictions\",\n",
       "    kafka_brokers=kafka_brokers,\n",
       "    lifespan=lifespan,\n",
       ")\n",
       "\n",
       "@kafka_app.consumes(topic=\"input_data\", auto_offset_reset=\"latest\")\n",
       "async def on_input_data(msg: IrisInputData):\n",
       "    species_class = ml_models[\"iris_predictor\"].predict(\n",
       "        [[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]\n",
       "    )[0]\n",
       "\n",
       "    await to_predictions(species_class)\n",
       "\n",
       "\n",
       "@kafka_app.produces(topic=\"predictions\")\n",
       "async def to_predictions(species_class: int) -> IrisPrediction:\n",
       "    iris_species = [\"setosa\", \"versicolor\", \"virginica\"]\n",
       "\n",
       "    prediction = IrisPrediction(species=iris_species[species_class])\n",
       "    return prediction\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "complete_app = (\n",
    "    import_lifespan\n",
    "    + import_pydantic\n",
    "    + import_fastkafka\n",
    "    + data_model\n",
    "    + lifespan\n",
    "    + app\n",
    "    + producers_and_consumers\n",
    ")\n",
    "md(f\"```python\\n{complete_app}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b1a3a24c",
   "metadata": {},
   "source": [
    "### Running the app"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f2509682",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "Now we can run the app with your custom lifespan handler. Copy the code above in lifespan_example.py and run it by running\n",
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker=localhost lifespan_example:kafka_app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_file = \"lifespan_example.py\"\n",
    "cmd = (\n",
    "    \"fastkafka run --num-workers=1 --kafka-broker=localhost lifespan_example:kafka_app\"\n",
    ")\n",
    "md(\n",
    "    f\"Now we can run the app with your custom lifespan handler. Copy the code above in lifespan_example.py and run it by running\\n```shell\\n{cmd}\\n```\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d7a53cc7",
   "metadata": {},
   "source": [
    "When you run the app, you should see a simmilar output to the one below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ca8952a2",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "\n",
    "async def _run_example_app(\n",
    "    *, app_example: str, bootstrap_server: str, script_file: str, cmd: str\n",
    ") -> Tuple[int, str]:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=app_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=20,\n",
    "    )\n",
    "    return exit_code, output.decode(\"UTF-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bea6a823",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-06-02 12:09:07.075 [INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "23-06-02 12:09:07.075 [WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_WindowsSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "23-06-02 12:09:07.075 [WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "23-06-02 12:09:07.083 [INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "23-06-02 12:09:07.091 [INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "23-06-02 12:09:07.091 [INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "23-06-02 12:09:07.099 [INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "23-06-02 12:09:07.099 [INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "\n",
      "23-06-02 12:09:07.099 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:09:07.107 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50644\n",
      "\n",
      "23-06-02 12:09:07.107 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:09:07.107 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50645\n",
      "\n",
      "23-06-02 12:09:07.107 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:09:07.115 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50646\n",
      "\n",
      "23-06-02 12:09:07.115 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:09:07.115 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50647\n",
      "23-06-02 12:09:07.115 [INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n"
     ]
    },
    {
     "ename": "ValueError",
     "evalue": "Could not start zookeeper with params: [{'zookeeper_port': 2181}, {'zookeeper_port': '50644'}, {'zookeeper_port': '50645'}, {'zookeeper_port': '50646'}]",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mValueError\u001b[0m                                Traceback (most recent call last)",
      "Cell \u001b[1;32mIn[9], line 3\u001b[0m\n\u001b[0;32m      1\u001b[0m \u001b[38;5;66;03m# | hide\u001b[39;00m\n\u001b[1;32m----> 3\u001b[0m \u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mApacheKafkaBroker\u001b[49m\u001b[43m(\u001b[49m\n\u001b[0;32m      4\u001b[0m \u001b[43m    \u001b[49m\u001b[43mtopicas\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mhello_world\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mapply_nest_asyncio\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\n\u001b[0;32m      5\u001b[0m \u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mas\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mbootstrap_server\u001b[49m\u001b[43m:\u001b[49m\n\u001b[0;32m      6\u001b[0m \u001b[43m    \u001b[49m\u001b[43mexit_code\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43moutput\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mawait\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43m_run_example_app\u001b[49m\u001b[43m(\u001b[49m\n\u001b[0;32m      7\u001b[0m \u001b[43m        \u001b[49m\u001b[43mapp_example\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcomplete_app\u001b[49m\u001b[43m,\u001b[49m\n\u001b[0;32m      8\u001b[0m \u001b[43m        \u001b[49m\u001b[43mbootstrap_server\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mbootstrap_server\u001b[49m\u001b[43m,\u001b[49m\n\u001b[0;32m      9\u001b[0m \u001b[43m        \u001b[49m\u001b[43mscript_file\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mscript_file\u001b[49m\u001b[43m,\u001b[49m\n\u001b[0;32m     10\u001b[0m \u001b[43m        \u001b[49m\u001b[43mcmd\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcmd\u001b[49m\u001b[43m,\u001b[49m\n\u001b[0;32m     11\u001b[0m \u001b[43m    \u001b[49m\u001b[43m)\u001b[49m\n\u001b[0;32m     12\u001b[0m \u001b[43m    \u001b[49m\u001b[43mexpected_returncode\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mif\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mplatform\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msystem\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m==\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mWindows\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01melse\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[38;5;241;43m0\u001b[39;49m\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:289\u001b[0m, in \u001b[0;36mApacheKafkaBroker.__enter__\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    287\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m__enter__\u001b[39m(\u001b[38;5;28mself\u001b[39m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28mstr\u001b[39m:\n\u001b[0;32m    288\u001b[0m     \u001b[38;5;66;03m#         ApacheKafkaBroker._check_deps()\u001b[39;00m\n\u001b[1;32m--> 289\u001b[0m     \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstart\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:621\u001b[0m, in \u001b[0;36mstart\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    618\u001b[0m         logger\u001b[38;5;241m.\u001b[39merror(msg)\n\u001b[0;32m    619\u001b[0m         \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(msg)\n\u001b[1;32m--> 621\u001b[0m retval \u001b[38;5;241m=\u001b[39m \u001b[43mloop\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun_until_complete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_start\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[0;32m    622\u001b[0m logger\u001b[38;5;241m.\u001b[39minfo(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m\u001b[38;5;18m__class__\u001b[39m\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m.start(): returning \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mretval\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m    623\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m retval\n",
      "File \u001b[1;32m~\\dev\\fastkafka\\venv\\Lib\\site-packages\\nest_asyncio.py:90\u001b[0m, in \u001b[0;36m_patch_loop.<locals>.run_until_complete\u001b[1;34m(self, future)\u001b[0m\n\u001b[0;32m     87\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m f\u001b[38;5;241m.\u001b[39mdone():\n\u001b[0;32m     88\u001b[0m     \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\n\u001b[0;32m     89\u001b[0m         \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mEvent loop stopped before Future completed.\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m---> 90\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
      "File \u001b[1;32m~\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\asyncio\\futures.py:203\u001b[0m, in \u001b[0;36mFuture.result\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    201\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m__log_traceback \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[0;32m    202\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m--> 203\u001b[0m     \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception\u001b[38;5;241m.\u001b[39mwith_traceback(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception_tb)\n\u001b[0;32m    204\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_result\n",
      "File \u001b[1;32m~\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\asyncio\\tasks.py:267\u001b[0m, in \u001b[0;36mTask.__step\u001b[1;34m(***failed resolving arguments***)\u001b[0m\n\u001b[0;32m    263\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m    264\u001b[0m     \u001b[38;5;28;01mif\u001b[39;00m exc \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[0;32m    265\u001b[0m         \u001b[38;5;66;03m# We use the `send` method directly, because coroutines\u001b[39;00m\n\u001b[0;32m    266\u001b[0m         \u001b[38;5;66;03m# don't have `__iter__` and `__next__` methods.\u001b[39;00m\n\u001b[1;32m--> 267\u001b[0m         result \u001b[38;5;241m=\u001b[39m \u001b[43mcoro\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[0;32m    268\u001b[0m     \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m    269\u001b[0m         result \u001b[38;5;241m=\u001b[39m coro\u001b[38;5;241m.\u001b[39mthrow(exc)\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:561\u001b[0m, in \u001b[0;36m_start\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    558\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtemporary_directory \u001b[38;5;241m=\u001b[39m TemporaryDirectory()\n\u001b[0;32m    559\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtemporary_directory_path \u001b[38;5;241m=\u001b[39m Path(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtemporary_directory\u001b[38;5;241m.\u001b[39m\u001b[38;5;21m__enter__\u001b[39m())\n\u001b[1;32m--> 561\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_start_zookeeper()\n\u001b[0;32m    562\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_start_kafka()\n\u001b[0;32m    564\u001b[0m listener_port \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mkafka_kwargs\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlistener_port\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;241m9092\u001b[39m)\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:515\u001b[0m, in \u001b[0;36m_start_zookeeper\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    512\u001b[0m \u001b[38;5;129m@patch\u001b[39m\n\u001b[0;32m    513\u001b[0m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_start_zookeeper\u001b[39m(\u001b[38;5;28mself\u001b[39m: ApacheKafkaBroker) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[0;32m    514\u001b[0m \u001b[38;5;250m    \u001b[39m\u001b[38;5;124;03m\"\"\"Starts a local ZooKeeper instance asynchronously.\"\"\"\u001b[39;00m\n\u001b[1;32m--> 515\u001b[0m     \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_start_service(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mzookeeper\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:503\u001b[0m, in \u001b[0;36m_start_service\u001b[1;34m(self, service)\u001b[0m\n\u001b[0;32m    500\u001b[0m         \u001b[38;5;28msetattr\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mservice\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m_task\u001b[39m\u001b[38;5;124m\"\u001b[39m, service_task)\n\u001b[0;32m    501\u001b[0m         \u001b[38;5;28;01mreturn\u001b[39;00m\n\u001b[1;32m--> 503\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mCould not start \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mservice\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m with params: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mconfigs_tried\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n",
      "\u001b[1;31mValueError\u001b[0m: Could not start zookeeper with params: [{'zookeeper_port': 2181}, {'zookeeper_port': '50644'}, {'zookeeper_port': '50645'}, {'zookeeper_port': '50646'}]"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topicas=[\"hello_world\"], apply_nest_asyncio=True, listener_port=59092\n",
    ") as bootstrap_server:\n",
    "    exit_code, output = await _run_example_app(\n",
    "        app_example=complete_app,\n",
    "        bootstrap_server=bootstrap_server,\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "    )\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, output\n",
    "    assert \"Loading the model!\" in output, output\n",
    "    assert \"Exiting, clearing model dict!\" in output, output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "417e5eb3",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | echo: false\n",
    "\n",
    "print(output)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2192a4f5",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "In this guide we have defined a lifespan handler and passed to our FastKafka app.\n",
    "\n",
    "Some important points are:\n",
    "\n",
    "1. Lifespan handler is implemented as [AsyncContextManager](https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextmanager)\n",
    "2. Code **before** yield in lifespan will be executed **before** application **startup**\n",
    "3. Code **after** yield in lifespan will be executed **after** application **shutdown**\n",
    "4. You can pass your lifespan handler to FastKafka app on initialisation by passing a `lifespan` argument\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
